package com.cloudansys.core.flink.sink;

import cn.hutool.core.date.DateUtil;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class MeanInfluxSink extends RichSinkFunction<List<MultiDataEntity>> {

    private static InfluxDB influxDB;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        influxDB = DefaultConfig.getInfluxDB();
    }

    @Override
    public void invoke(List<MultiDataEntity> element, Context context) throws Exception {
        for (MultiDataEntity multiDataEntity : element) {
            influxDB.write(getPoint(multiDataEntity));
        }
        log.info("##### save to influx success【mean:{}】#####", element.get(0).getTypeTag());
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (influxDB != null) {
            influxDB.close();
        }
    }

    /**
     * 构建 influxDB 中的一条记录 point
     */
    private Point getPoint(MultiDataEntity multiDataEntity) {
        String projectId = multiDataEntity.getProjectId();
        String serialCode = multiDataEntity.getSerialCode();
        String targetType = multiDataEntity.getTypeTag().toLowerCase();
        String pickTime = multiDataEntity.getPickTime();
        Double[] values = multiDataEntity.getValues();
        long timestamp = DateUtil.parse(pickTime, Const.FMT_TRIM_MILLI).getTime();
        Point.Builder pointBuilder = Point.measurement(Const.INFLUX_MEASUREMENT_BASE_MEAN + targetType)
                // 时间一律采用毫秒
                .time(timestamp, TimeUnit.MILLISECONDS)
                .tag(Const.PID, projectId)
                .tag(Const.SC, serialCode);
        for (int j = 0; j < values.length; j++) {
            String fieldName = Const.INFLUX_MEASUREMENT_FIELD_BASE + (j + 1);
            // 数值一律采用 double 类型
            double fieldValue = values[j];
            // 添加 field
            pointBuilder.addField(fieldName, fieldValue);
        }
        return pointBuilder.build();
    }

}
